package gpol

import (
	"context"
	"errors"
	"fmt"
	"sync"

	"github.com/go-logr/logr"
	"github.com/kyverno/kyverno/api/kyverno"
	kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
	"github.com/kyverno/kyverno/pkg/background/common"
	"github.com/kyverno/kyverno/pkg/clients/dclient"
	reportutils "github.com/kyverno/kyverno/pkg/utils/report"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/client-go/restmapper"
	"k8s.io/client-go/tools/cache"
	watchTools "k8s.io/client-go/tools/watch"
)

type Resource struct {
	Name      string
	Namespace string
	Hash      string
	Labels    map[string]string
	Data      *unstructured.Unstructured
}

type WatchManager struct {
	// clients
	client dclient.Interface

	// mapper
	restMapper meta.RESTMapper

	// dynamicWatchers references the GVR of the generated resources to the corresponding watcher.
	dynamicWatchers map[schema.GroupVersionResource]*watcher
	// policyRefs maps the policy name to the set of generated resources.
	policyRefs map[string][]schema.GroupVersionResource
	// refCount tracks the number of policies that generates the same resource.
	refCount map[schema.GroupVersionResource]int

	log  logr.Logger
	lock sync.Mutex
}

type watcher struct {
	watcher       watch.Interface
	metadataCache map[types.UID]Resource
}

func NewWatchManager(log logr.Logger, client dclient.Interface) *WatchManager {
	apiGroupResources, _ := restmapper.GetAPIGroupResources(client.GetKubeClient().Discovery())
	restMapper := restmapper.NewDiscoveryRESTMapper(apiGroupResources)
	return &WatchManager{
		log:             log,
		client:          client,
		restMapper:      restMapper,
		dynamicWatchers: map[schema.GroupVersionResource]*watcher{},
		policyRefs:      map[string][]schema.GroupVersionResource{},
		refCount:        map[schema.GroupVersionResource]int{},
	}
}

func (wm *WatchManager) SyncWatchers(policyName string, generatedResources []*unstructured.Unstructured) error {
	wm.lock.Lock()
	defer wm.lock.Unlock()

	logger := wm.log
	newGVRs := make(map[schema.GroupVersionResource]bool)
	// start a new watcher for each generated resource
	for _, resource := range generatedResources {
		gvk := resource.GroupVersionKind()
		mapping, err := wm.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
		if err != nil {
			return fmt.Errorf("failed to map gvk to gvr %s (%v)", gvk, err)
		}

		gvr := mapping.Resource
		logger.V(2).Info("processing generated resource", "gvr", gvr, "name", resource.GetName())
		newGVRs[gvr] = true

		// if the watcher for this GVR already exists, skip it
		if wm.dynamicWatchers[gvr] != nil {
			logger.V(2).Info("watcher already exists for GVR", "gvr", gvr)
			// add the resource to the metadata cache
			wm.dynamicWatchers[gvr].metadataCache[resource.GetUID()] = Resource{
				Name:      resource.GetName(),
				Namespace: resource.GetNamespace(),
				Labels:    resource.GetLabels(),
				Hash:      reportutils.CalculateResourceHash(*resource),
				Data:      resource,
			}
			continue
		}

		// start a new watcher for this GVR
		w, err := wm.startWatcher(resource, gvr)
		if err != nil {
			return err
		}
		wm.dynamicWatchers[gvr] = w
		// add the resource to the metadata cache
		wm.dynamicWatchers[gvr].metadataCache[resource.GetUID()] = Resource{
			Name:      resource.GetName(),
			Namespace: resource.GetNamespace(),
			Labels:    resource.GetLabels(),
			Hash:      reportutils.CalculateResourceHash(*resource),
			Data:      resource,
		}
	}

	oldGVRs := wm.policyRefs[policyName]
	// update the ref count for the new GVRs
	for gvr := range newGVRs {
		found := false
		for _, oldGVR := range oldGVRs {
			if gvr == oldGVR {
				found = true
				break
			}
		}
		if !found {
			wm.refCount[gvr]++
		}
	}
	// stop old watchers that are not in the new gvr list.
	// it means that the policy no longer generates those resources.
	for _, oldGVR := range oldGVRs {
		if _, found := newGVRs[oldGVR]; !found {
			wm.refCount[oldGVR]--
			// delete the old resources generated by this policy
			if oldWatcher, exists := wm.dynamicWatchers[oldGVR]; exists {
				var uidsToDelete []types.UID
				for uid, resource := range oldWatcher.metadataCache {
					// identify resources managed by this specific policy via a label
					if resource.Labels[common.GeneratePolicyLabel] == policyName {
						uidsToDelete = append(uidsToDelete, uid)
					}
				}

				for _, uid := range uidsToDelete {
					res := oldWatcher.metadataCache[uid]
					logger.V(4).Info("deleting downstream resource", "kind", res.Data.GetKind(), "name", res.Name, "namespace", res.Namespace)
					err := wm.client.DeleteResource(context.TODO(), res.Data.GetAPIVersion(), res.Data.GetKind(), res.Namespace, res.Name, false, metav1.DeleteOptions{})
					if err != nil {
						logger.Error(err, "failed to delete downstream resource", "name", res.Name, "namespace", res.Namespace)
					} else {
						logger.V(4).Info("downstream resource deleted", "name", res.Name, "namespace", res.Namespace)
						// remove the resource from the metadata cache upon successful deletion
						delete(oldWatcher.metadataCache, uid)
					}
				}

				// stop the watcher if no other policy is using it
				if wm.refCount[oldGVR] <= 0 {
					oldWatcher.watcher.Stop()
					delete(wm.dynamicWatchers, oldGVR)
					delete(wm.refCount, oldGVR)
				}
			}
		}
	}
	// update the policyRefs with the new GVRs
	wm.policyRefs[policyName] = make([]schema.GroupVersionResource, 0, len(newGVRs))
	for gvr := range newGVRs {
		wm.policyRefs[policyName] = append(wm.policyRefs[policyName], gvr)
	}
	return nil
}

// GetDownstreams retrieves all downstream resources generated by a specific policy.
func (wm *WatchManager) GetDownstreams(policyName string) []*unstructured.Unstructured {
	logger := wm.log.WithValues("policyName", policyName)
	logger.V(4).Info("getting downstream resources for policy")

	var downstreams []*unstructured.Unstructured
	if gvrList, exists := wm.policyRefs[policyName]; exists {
		for _, gvr := range gvrList {
			watcher, watcherExists := wm.dynamicWatchers[gvr]
			if !watcherExists {
				continue
			}

			for _, resource := range watcher.metadataCache {
				// identify resources managed by this specific policy via a label
				if resource.Labels[common.GeneratePolicyLabel] == policyName {
					logger.V(4).Info("found downstream resource", "kind", resource.Data.GetKind(), "name", resource.Name, "namespace", resource.Namespace)
					// create a copy of the resource to avoid modifying the original one
					downstream := resource.Data.DeepCopy()
					downstreams = append(downstreams, downstream)
				}
			}
		}
	} else {
		logger.V(4).Info("no watchers found for policy")
	}
	return downstreams
}

// DeleteDownstreams deletes all downstream resources generated by a policy in the following cases:
// 1. When the trigger resource is deleted.
// 2. When the policy is updated to generate different resources.
// It will not stop the watchers, but will delete the generated resources and remove them from the metadata cache.
func (wm *WatchManager) DeleteDownstreams(policyName string, trigger *kyvernov1.ResourceSpec) {
	wm.lock.Lock()
	defer wm.lock.Unlock()

	logger := wm.log.WithValues("policyName", policyName)
	logger.V(2).Info("deleting downstream resources for policy")

	if gvrList, exists := wm.policyRefs[policyName]; exists {
		for _, gvr := range gvrList {
			logger := logger.WithValues("gvr", gvr)
			watcher, watcherExists := wm.dynamicWatchers[gvr]
			if !watcherExists {
				continue
			}

			var uidsToDelete []types.UID
			for uid, resource := range watcher.metadataCache {
				// identify resources managed by this specific policy via a label
				if resource.Labels[common.GeneratePolicyLabel] == policyName {
					if trigger == nil || resource.Labels[common.GenerateTriggerUIDLabel] == string(trigger.UID) {
						uidsToDelete = append(uidsToDelete, uid)
					}
				}
			}

			for _, uid := range uidsToDelete {
				res := watcher.metadataCache[uid]
				logger.V(4).Info("deleting downstream resource", "kind", res.Data.GetKind(), "name", res.Name, "namespace", res.Namespace)
				err := wm.client.DeleteResource(context.TODO(), res.Data.GetAPIVersion(), res.Data.GetKind(), res.Namespace, res.Name, false, metav1.DeleteOptions{})
				if err != nil {
					logger.Error(err, "failed to delete downstream resource", "name", res.Name, "namespace", res.Namespace)
				} else {
					logger.V(4).Info("downstream resource deleted", "name", res.Name, "namespace", res.Namespace)
					// remove the resource from the metadata cache upon successful deletion
					delete(watcher.metadataCache, uid)
				}
			}
		}
	}
}

// RemoveWatchersForPolicy removes all downstream resources and watchers for a given policy name.
func (wm *WatchManager) RemoveWatchersForPolicy(policyName string, deleteDownstream bool) {
	wm.lock.Lock()
	defer wm.lock.Unlock()

	logger := wm.log.WithValues("policyName", policyName)
	logger.V(2).Info("removing policy resources and watchers")

	if gvrList, exists := wm.policyRefs[policyName]; exists {
		for _, gvr := range gvrList {
			logger := logger.WithValues("gvr", gvr)
			watcher, watcherExists := wm.dynamicWatchers[gvr]
			if !watcherExists {
				continue
			}

			var uidsToDelete []types.UID
			for uid, resource := range watcher.metadataCache {
				// identify resources managed by this specific policy via a label
				if resource.Labels[common.GeneratePolicyLabel] == policyName {
					uidsToDelete = append(uidsToDelete, uid)
				}
			}

			for _, uid := range uidsToDelete {
				res := watcher.metadataCache[uid]
				// delete the downstream if the policy is deleted and it is the only source that generated it.
				if deleteDownstream {
					if _, exists := res.Labels[common.GenerateSourceUIDLabel]; !exists {
						logger.V(4).Info("deleting downstream resource", "kind", res.Data.GetKind(), "name", res.Name, "namespace", res.Namespace)
						err := wm.client.DeleteResource(context.TODO(), res.Data.GetAPIVersion(), res.Data.GetKind(), res.Namespace, res.Name, false, metav1.DeleteOptions{})
						if err != nil {
							logger.Error(err, "failed to delete downstream resource", "name", res.Name, "namespace", res.Namespace)
						} else {
							logger.V(4).Info("downstream resource deleted", "name", res.Name, "namespace", res.Namespace)
						}
					}
				}
				// remove the resource from the metadata cache
				delete(watcher.metadataCache, uid)
			}

			// decrement ref count and stop watcher if no longer needed
			wm.refCount[gvr]--
			if wm.refCount[gvr] <= 0 {
				logger.V(3).Info("stopping watcher as it has no more references")
				watcher.watcher.Stop()
				delete(wm.dynamicWatchers, gvr)
				delete(wm.refCount, gvr)
			}
		}
		// Clean up the policy reference
		delete(wm.policyRefs, policyName)
	} else {
		logger.V(4).Info("no watchers found for policy")
	}
}

// StopWatchers stops all dynamic watchers and clears the internal state.
func (wm *WatchManager) StopWatchers() {
	wm.lock.Lock()
	defer wm.lock.Unlock()
	for _, watcher := range wm.dynamicWatchers {
		watcher.watcher.Stop()
	}
	wm.dynamicWatchers = map[schema.GroupVersionResource]*watcher{}
	wm.policyRefs = map[string][]schema.GroupVersionResource{}
	wm.refCount = map[schema.GroupVersionResource]int{}
}

// startWatcher starts a new watcher for the given resource and GVR.
// It initializes the metadata cache and sets up the watch interface.
// The watcher will handle events for the resource and update the metadata cache accordingly.
// It returns a pointer to the watcher or an error if the watcher could not be created.
func (wm *WatchManager) startWatcher(resource *unstructured.Unstructured, gvr schema.GroupVersionResource) (*watcher, error) {
	metadataCache := map[types.UID]Resource{}
	metadataCache[resource.GetUID()] = Resource{
		Name:      resource.GetName(),
		Namespace: resource.GetNamespace(),
		Labels:    resource.GetLabels(),
		Hash:      reportutils.CalculateResourceHash(*resource),
		Data:      resource,
	}
	resourceVersion := resource.GetResourceVersion()
	logger := wm.log.WithValues("resourceVersion", resourceVersion)
	logger.V(2).Info("start watcher ...")
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		logger.V(3).Info("creating watcher...")
		watch, err := wm.client.GetDynamicInterface().Resource(gvr).Watch(context.Background(), options)
		if err != nil {
			logger.Error(err, "failed to watch")
		}
		return watch, err
	}
	watchInterface, err := watchTools.NewRetryWatcherWithContext(context.TODO(), resourceVersion, &cache.ListWatch{WatchFunc: watchFunc})
	if err != nil {
		logger.Error(err, "failed to create watcher")
		return nil, err
	} else {
		w := &watcher{
			watcher:       watchInterface,
			metadataCache: metadataCache,
		}
		go func(gvr schema.GroupVersionResource) {
			defer logger.V(2).Info("watcher stopped")
			for event := range watchInterface.ResultChan() {
				switch event.Type {
				case watch.Added:
					wm.handleAdd(event.Object.(*unstructured.Unstructured), gvr)
				case watch.Modified:
					wm.handleUpdate(event.Object.(*unstructured.Unstructured), gvr)
				case watch.Deleted:
					wm.handleDelete(event.Object.(*unstructured.Unstructured), gvr)
				case watch.Error:
					logger.Error(errors.New("watch error event received"), "watch error event received", "event", event.Object)
				}
			}
		}(gvr)
		return w, nil
	}
}

func (wm *WatchManager) handleAdd(obj *unstructured.Unstructured, gvr schema.GroupVersionResource) {
	wm.log.Info("Resource added", "name", obj.GetName())
}

func (wm *WatchManager) handleUpdate(obj *unstructured.Unstructured, gvr schema.GroupVersionResource) {
	wm.lock.Lock()
	defer wm.lock.Unlock()

	wm.log.Info("Resource updated", "name", obj.GetName())
	watcher, exists := wm.dynamicWatchers[gvr]
	if exists {
		uid := obj.GetUID()
		// the metadata cache is used to track the generated resources.
		// if the resource is not in the cache, it means it is one of the following:
		// - it is the source resource with sync enabled, and we need to update the downstream resources.
		// - it is the source resource with sync disabled, and we do nothing.
		// - it is the downstream resource with sync disabled, and we do nothing.
		if _, ok := watcher.metadataCache[uid]; !ok {
			source := obj.DeepCopy()
			// clean up parameters that shouldn't be copied
			source.SetUID("")
			source.SetSelfLink("")
			source.SetCreationTimestamp(metav1.Time{})
			source.SetManagedFields(nil)
			source.SetResourceVersion("")
			// fetch downstreams that have the source UID label.
			sourceLabel := map[string]string{
				common.GenerateSourceUIDLabel: string(uid),
			}
			selector := &metav1.LabelSelector{MatchLabels: sourceLabel}
			downstreams, err := wm.client.ListResource(context.TODO(), obj.GetAPIVersion(), obj.GetKind(), "", selector)
			if err != nil {
				wm.log.Error(err, "failed to list downstream resources")
			}
			for _, downstream := range downstreams.Items {
				// if the downstream doesn't exist in the metadata cache, it means sync is disabled.
				if _, exists := watcher.metadataCache[downstream.GetUID()]; !exists {
					continue
				}
				// update the downstream resources with the source information.
				newResource := &unstructured.Unstructured{}
				newResource.SetUnstructuredContent(source.UnstructuredContent())
				newResource.SetName(downstream.GetName())
				newResource.SetNamespace(downstream.GetNamespace())
				newResource.SetKind(downstream.GetKind())
				newResource.SetAPIVersion(downstream.GetAPIVersion())
				newResource.SetLabels(downstream.GetLabels())
				_, err := wm.client.UpdateResource(context.TODO(), downstream.GetAPIVersion(), downstream.GetKind(), downstream.GetNamespace(), newResource, false)
				if err != nil {
					wm.log.Error(err, "failed to update downstream resource", "name", downstream.GetName(), "namespace", downstream.GetNamespace())
				} else {
					wm.log.V(4).Info("downstream resource updated", "name", downstream.GetName(), "namespace", downstream.GetNamespace())
					hash := reportutils.CalculateResourceHash(*newResource)
					// update the metadata cache for the downstream resource
					watcher.metadataCache[downstream.GetUID()] = Resource{
						Name:      downstream.GetName(),
						Namespace: downstream.GetNamespace(),
						Labels:    downstream.GetLabels(),
						Hash:      hash,
						Data:      newResource,
					}
					wm.log.V(4).Info("resource metadata updated", "name", downstream.GetName(), "namespace", downstream.GetNamespace(), "hash", hash)
				}
			}
		} else {
			// if the resource is already in the cache, then it is the downstream resource that has been updated and it is sync enabled.
			hash := reportutils.CalculateResourceHash(*obj)
			// if the hash of the resource is different from the one in the cache
			// then we need to revert the downstream resource as it means that it has been updated by the user.
			if hash != watcher.metadataCache[uid].Hash {
				wm.log.V(4).Info("downstream resource updated by user, reverting changes", "name", obj.GetName(), "namespace", obj.GetNamespace())
				downstream := watcher.metadataCache[uid].Data
				// clean up parameters that shouldn't be copied
				downstream.SetUID("")
				downstream.SetSelfLink("")
				downstream.SetCreationTimestamp(metav1.Time{})
				downstream.SetManagedFields(nil)
				downstream.SetResourceVersion("")
				_, err := wm.client.UpdateResource(context.TODO(), downstream.GetAPIVersion(), downstream.GetKind(), downstream.GetNamespace(), downstream, false)
				if err != nil {
					wm.log.Error(err, "failed to revert downstream resource", "name", obj.GetName(), "namespace", obj.GetNamespace())
				} else {
					wm.log.V(4).Info("downstream resource reverted", "name", obj.GetName(), "namespace", obj.GetNamespace())
				}
			}
		}
	}
}

func (wm *WatchManager) handleDelete(obj *unstructured.Unstructured, gvr schema.GroupVersionResource) {
	wm.lock.Lock()
	defer wm.lock.Unlock()

	wm.log.Info("Resource deleted", "name", obj.GetName())
	watcher, exists := wm.dynamicWatchers[gvr]
	if exists {
		uid := obj.GetUID()
		labels := obj.GetLabels()
		managedBy := labels[kyverno.LabelAppManagedBy] == kyverno.ValueKyvernoApp
		// the source resource is deleted, we need to clean up the downstream resources.
		if !managedBy {
			wm.log.V(4).Info("source resource deleted, removing downstreams", "name", obj.GetName(), "namespace", obj.GetNamespace())
			// fetch downstreams that have the source UID label.
			sourceLabel := map[string]string{
				common.GenerateSourceUIDLabel: string(uid),
			}
			selector := &metav1.LabelSelector{MatchLabels: sourceLabel}
			downstreams, err := wm.client.ListResource(context.TODO(), obj.GetAPIVersion(), obj.GetKind(), "", selector)
			if err != nil {
				wm.log.Error(err, "failed to list downstream resources")
			} else {
				for _, downstream := range downstreams.Items {
					// if the downstream doesn't exist in the metadata cache, it means sync is disabled.
					if _, exists := watcher.metadataCache[downstream.GetUID()]; !exists {
						continue
					}
					err := wm.client.DeleteResource(context.TODO(), downstream.GetAPIVersion(), downstream.GetKind(), downstream.GetNamespace(), downstream.GetName(), false, metav1.DeleteOptions{})
					if err != nil {
						wm.log.Error(err, "failed to delete downstream resource", "name", downstream.GetName(), "namespace", downstream.GetNamespace())
					} else {
						wm.log.V(4).Info("downstream resource deleted", "name", downstream.GetName(), "namespace", downstream.GetNamespace())
					}
					// remove the resource from the metadata cache
					delete(watcher.metadataCache, downstream.GetUID())
				}
			}
		} else {
			if _, ok := watcher.metadataCache[uid]; ok {
				wm.log.V(4).Info("downstream resource deleted", "name", obj.GetName(), "namespace", obj.GetNamespace())
				// if the resource is already in the cache, then it is the downstream resource that has been deleted by the user.
				// we need to revert it back.
				downstream := watcher.metadataCache[uid].Data
				// clean up parameters that shouldn't be copied
				downstream.SetUID("")
				downstream.SetSelfLink("")
				downstream.SetCreationTimestamp(metav1.Time{})
				downstream.SetManagedFields(nil)
				downstream.SetResourceVersion("")
				_, err := wm.client.CreateResource(context.TODO(), downstream.GetAPIVersion(), downstream.GetKind(), downstream.GetNamespace(), downstream, false)
				if err != nil {
					wm.log.Error(err, "failed to revert downstream resource", "name", obj.GetName(), "namespace", obj.GetNamespace())
				} else {
					wm.log.V(4).Info("downstream resource reverted", "name", obj.GetName(), "namespace", obj.GetNamespace())
				}
			}
		}
	}
}
